/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.log.es

import joptsimple.OptionParser
import kafka.admin.BrokerApiVersionsCommand.AdminClient
import kafka.utils.{CommandLineUtils, TestUtils}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, NewPartitionReassignment, NewTopic}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.{KafkaFuture, Node, TopicPartition}

import java.nio.charset.StandardCharsets.UTF_8
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Optional, Properties}
import scala.jdk.CollectionConverters._

/**
 * This is a test for reassigning a partition and reproducing. We focus on if the ElasticLog can be reopened successfully
 * and how long it takes.
 */
object ElasticLogReopenTester {
    def main(args: Array[String]): Unit = {
        val parser = new OptionParser(false)
        val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
            .withOptionalArg
            .describedAs("count")
            .ofType(classOf[java.lang.Long])
            .defaultsTo(10L)
        val messageCompressionOpt = parser.accepts("compression-type", "message compression type")
            .withOptionalArg
            .describedAs("compressionType")
            .ofType(classOf[java.lang.String])
            .defaultsTo("none")
        val brokerOpt = parser.accepts("bootstrap-server", "The server(s) to connect to.")
            .withRequiredArg
            .describedAs("url")
            .ofType(classOf[String])
        val topicOpt = parser.accepts("topic", "The topic to be tested.")
            .withOptionalArg
            .describedAs("topic")
            .ofType(classOf[String])
            .defaultsTo("test-elastic-log-reopen")
        val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic.")
            .withOptionalArg
            .describedAs("partition count")
            .ofType(classOf[java.lang.Integer])
            .defaultsTo(1)
        val replicationOpt = parser.accepts("replication-factor", "The replication factor (in ES layer) for each partition in the topic being created.")
            .withOptionalArg
            .describedAs("replication factor")
            .ofType(classOf[java.lang.Short])
            .defaultsTo(1.toShort)

        val options = parser.parse(args: _*)
        if (args.isEmpty)
            CommandLineUtils.printUsageAndDie(parser, "A tool to test log reopening elastic log. Valid options are: ")

        CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt)

        // parse options
        val messages = options.valueOf(numMessagesOpt).longValue
        val compressionType = options.valueOf(messageCompressionOpt)
        val brokerUrl = options.valueOf(brokerOpt)
        val topic = options.valueOf(topicOpt)
        val partitionNum = options.valueOf(partitionsOpt).intValue
        val replicationFactor = options.valueOf(replicationOpt).shortValue

        createTopics(brokerUrl, Seq(topic), partitionNum, replicationFactor)
        println(s"Producing $messages messages to topic $topic")
        produceMessages(brokerUrl, Array(topic), messages, compressionType)

        reassignTopicsAndProduce(brokerUrl, Seq(topic), compressionType)
    }

    def getAllAliveBrokers(brokerUrl: String): Seq[Node] = {
        val props = new Properties()
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
        val adminClient = AdminClient.create(props)
        adminClient.awaitBrokers()
        val brokerMap = adminClient.listAllBrokerVersionInfo()
        val brokers = brokerMap.keys.toSeq
        adminClient.close()
        brokers
    }

    def reassignTopicsAndProduce(brokerUrl: String, topics: Seq[String], compressionType: String): Unit = {
        val aliveBrokers = getAllAliveBrokers(brokerUrl)
        if (aliveBrokers.size < 2) {
            throw new RuntimeException("There should be at least 2 alive brokers, but found " + aliveBrokers.size + " alive brokers")
        }

        val adminConfig = new Properties
        adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
        val adminClient = Admin.create(adminConfig)
        try {
            val reassignmentMap = new util.LinkedHashMap[TopicPartition, Optional[NewPartitionReassignment]]()
            val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).allTopicNames().get().values().asScala
            topicDescriptions.foreach(topicDescription => {
                topicDescription.partitions()
                    .forEach(partition => {
                        if (partition.isr().size() != 1) {
                            throw new RuntimeException("There should be exactly one isr for " + partition + ", but found " + partition.isr().size())
                        }
                        val newLeaderId = aliveBrokers.find(_.id() != partition.leader().id()).get.id()
                        println(s"Reassigning partition $partition from ${partition.leader().id()} to broker $newLeaderId")

                        reassignmentMap.put(new TopicPartition(topicDescription.name(), partition.partition()), Optional.of(new NewPartitionReassignment(Seq(newLeaderId).map(Integer.valueOf).asJava)))
                    })
            })

            // reassign partitions
            val startTime = System.currentTimeMillis()
            val results = adminClient.alterPartitionReassignments(reassignmentMap).values().values().toArray(Array.ofDim[KafkaFuture[Void]](0))
            KafkaFuture.allOf(results: _*).get(3, TimeUnit.SECONDS)

            // produce another message to the topic
            produceMessages(brokerUrl, topics.toArray, 1, compressionType)
            //if you change this output string, we need to update elastic_log_reopen_tester.pyp.py system test
            println(s"Reassigning partitions and reproducing took ${System.currentTimeMillis() - startTime} ms")

        } finally adminClient.close()

    }

    def createTopics(brokerUrl: String, topics: Seq[String], partitionNum: Int, replicationFactor: Short): Unit = {
        val adminConfig = new Properties
        adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
        val adminClient = Admin.create(adminConfig)

        try {
            val newTopics = topics.map(name => new NewTopic(name, partitionNum, replicationFactor)).asJava
            adminClient.createTopics(newTopics).all.get

            var pendingTopics: Seq[String] = Seq()
            TestUtils.waitUntilTrue(() => {
                val allTopics = adminClient.listTopics.names.get.asScala.toSeq
                pendingTopics = topics.filter(topicName => !allTopics.contains(topicName))
                pendingTopics.isEmpty
            }, s"timed out waiting for topics : $pendingTopics")

        } finally adminClient.close()
    }

    def produceMessages(brokerUrl: String,
        topics: Array[String],
        messages: Long,
        compressionType: String): Unit = {
        val producerProps = new Properties
        producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
        producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
        producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
        val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)
        try {
            for (i <- 0L until (messages * topics.length)) {
                val topic = topics((i % topics.length).toInt)
                val key = i
                val msg = new ProducerRecord(topic, key.toString.getBytes(UTF_8), i.toString.getBytes(UTF_8))
                producer.send(msg).get(1, TimeUnit.SECONDS)
            }
        } finally {
            producer.close()
        }

    }
}
